【MediaPipe】MediaPipeの仕様を詳しく調べてみた
カフェチームの山本です。
前回は、3つ以上の手を検出しようしたところ、骨格を3つ以上検出することができませんでした。
今回は、この問題を解決するために、MediaPipeの仕様を詳しく見ていきたいと思います。
(MediaPipeに関連する記事はこちらにまとめてあります。)
公式Docを読む
MediaPipe公式Docの基本概念・サンプルCalculator作成・フレームワークアーキテクチャがとても参考になりました。
基本概念は以前の記事で取り上げたためそちらをご覧ください。サンプルCalculator作成はちょっと細かいので取り上げないこととします。今回は、フレームワークアーキテクチャの内容について、書かれている解説を追いながら要点をまとめていきます。
スケジューリングの仕組み
Each graph has at least one scheduler queue. Each scheduler queue has exactly one executor. Nodes are statically assigned to a queue (and therefore to an executor). By default there is one queue, whose executor is a thread pool with a number of threads based on the system’s capabilities.
- Graphには、スケジューリングキュー(1つ以上、デフォルトは1つ)と、エクセキュータ(1つのみ)がある。
- スケジューリングキューには、Nodeのタスクが格納される
- エクセキュータは、スレッドプールとして働く。タスクにスレッドを割り当てる。
Each node has a scheduling state, which can be not ready, ready, or running. A readiness function determines whether a node is ready to run. This function is invoked at graph initialization, whenever a node finishes running, and whenever the state of a node’s inputs changes.
The readiness function used depends on the type of node. A node with no stream inputs is known as a source node; source nodes are always ready to run, until they tell the framework they have no more data to output, at which point they are closed.
Non-source nodes are ready if they have inputs to process, and if those inputs form a valid input set according to the conditions set by the node’s input policy (discussed below). Most nodes use the default input policy, but some nodes specify a different one.
Note: Because changing the input policy changes the guarantees the calculator’s code can expect from its inputs, it is not generally possible to mix and match calculators with arbitrary input policies. Thus a calculator that uses a special input policy should be written for it, and declare it in its contract.
- Nodeには3つの状態がある:not ready / ready / running
- 状態が読み取られるのは次のとき:Graphの初期化時、Nodeの実行が終了したとき、Nodeの入力が変化したとき
- ソースノード(入力がないNode)の場合、常に ready。フレームワークに終了したことを伝えればCloseされる。
- 非ソースノード(入力をもつNode)の場合、入力があり、Input Policyに合うデータがあれば、readyになる。
非ソースコードの状態は次のような変化をするものと推測されます。最初は not ready。Input Policyの条件にある入力が存在するとき ready に変わる(キューに追加される)。エクセキュータによって処理が始められると running に変わる。終了すると not ready に変わる。
When a node becomes ready, a task is added to the corresponding scheduler queue, which is a priority queue. The priority function is currently fixed, and takes into account static properties of the nodes and their topological sorting within the graph. For example, nodes closer to the output side of the graph have higher priority, while source nodes have the lowest priority.
Each queue is served by an executor, which is responsible for actually running the task by invoking the calculator’s code. Different executors can be provided and configured; this can be used to customize the use of execution resources, e.g. by running certain nodes on lower-priority threads.
- タスクの実行はエクセキュータによって決められる。優先度関数によって実行順序を決める。
このことから、各タスクは並列処理され、実行順はタスクごとにバラバラであることがわかります。
タイムスタンプ
MediaPipe graph execution is decentralized: there is no global clock, and different nodes can process data from different timestamps at the same time. This allows higher throughput via pipelining.
However, time information is very important for many perception workflows. Nodes that receive multiple input streams generally need to coordinate them in some way. For example, an object detector may output a list of boundary rectangles from a frame, and this information may be fed into a rendering node, which should process it together with the original frame.
Therefore, one of the key responsibilities of the MediaPipe framework is to provide input synchronization for nodes. In terms of framework mechanics, the primary role of a timestamp is to serve as a synchronization key.
- (そもそも)MediaPipeは並列処理を行うことで、パイプラインのスループットを向上している
- しかし、処理を順序立てることが必要な場合もある
- そのため、タイムスタンプを同期キーとして利用している
The two objectives of synchronization and determinism underlie several design choices. Notably, the packets pushed into a given stream must have monotonically increasing timestamps: this is not just a useful assumption for many nodes, but it is also relied upon by the synchronization logic. Each stream has a timestamp bound, which is the lowest possible timestamp allowed for a new packet on the stream. When a packet with timestamp
T
arrives, the bound automatically advances toT+1
, reflecting the monotonic requirement. This allows the framework to know for certain that no more packets with timestamp lower thanT
will arrive.
- 各Streamはタイムスタンプバウンドをもっている
- タイムスタンプバウンドは「ストリームに流れうるPacketの、タイムスタンプの最低値」を表している
- 逆にいうと、この値よりも小さい値のタイムスタンプのパケットは流れない
- Tのタイムスタンプを持ったPacketが流れたら、T+1に自動的にインクリメントされる
- タイムスタンプバウンドよりも小さい値のPacketが到着したら、そのStreamは「確定」される(後述)
ちょっと複雑ですが、タイムスタンプバウンドと同じ値のPacketがきたら、そのStreamは「確定」され、Nodeに入力される準備が整うということになると思います。
入力ポリシー(Input Policy)
Synchronization is handled locally on each node, using the input policy specified by the node.
The default input policy, defined by
DefaultInputStreamHandler
, provides deterministic synchronization of inputs, with the following guarantees:
- If packets with the same timestamp are provided on multiple input streams, they will always be processed together regardless of their arrival order in real time.
- Input sets are processed in strictly ascending timestamp order.
- No packets are dropped, and the processing is fully deterministic.
- The node becomes ready to process data as soon as possible given the guarantees above.
- 処理(Packet)の同期は、各ノードにおいて、Input Policyを利用しておこなう。
- DefaultInputStreamHandlerで定義されている、デフォルトのInput Policyでは以下のようなことを保証する
- 各StreamにPacketが到着した実時刻は関係なく、すべてのPacketが同時に処理される
- タイムスタンプの昇順に処理される
- Pakcetは失われない
To explain how it works, we need to introduce the definition of a settled timestamp. We say that a timestamp in a stream is settled if it lower than the timestamp bound. In other words, a timestamp is settled for a stream once the state of the input at that timestamp is irrevocably known: either there is a packet, or there is the certainty that a packet with that timestamp will not arrive.
- Packetのタイムスタンプが、流れるストリームのタイムスタンプバウンドよりも小さいとき、そのタイムスタンプは「確定」したとされる
Given this definition, a calculator with the default input policy is ready if there is a timestamp which is settled across all input streams and contains a packet on at least one input stream. The input policy provides all available packets for a settled timestamp as a single input set to the calculator.
- デフォルトのInput Policyでは、Nodeにおけるすべての入力Streamが「確定」したときに、すべてのパケットを1セットとしてCalculatorに入力する
One consequence of this deterministic behavior is that, for nodes with multiple input streams, there can be a theoretically unbounded wait for a timestamp to be settled, and an unbounded number of packets can be buffered in the meantime. (Consider a node with two input streams, one of which keeps sending packets while the other sends nothing and does not advance the bound.)
- よって、デフォルトのInput PolicyをもつNodeでは、あるStreamが「確定」しても、別のStreamが「確定」しなければ、Packetが渡されず、ずっと待ち状態になってしまう。
Therefore, we also provide for custom input policies: for example, splitting the inputs in different synchronization sets defined by
SyncSetInputStreamHandler
, or avoiding synchronization altogether and processing inputs immediately as they arrive defined byImmediateInputStreamHandler
.
- 上記のような状態を防ぐために、カスタムしたInput Policyがある。
処理フロー制御
There are two main flow control mechanisms. A backpressure mechanism throttles the execution of upstream nodes when the packets buffered on a stream reach a (configurable) limit defined by
CalculatorGraphConfig::max_queue_size
. This mechanism maintains deterministic behavior, and includes a deadlock avoidance system that relaxes configured limits when needed.The second system consists of inserting special nodes which can drop packets according to real-time constraints (typically using custom input policies) defined by
FlowLimiterCalculator
. For example, a common pattern places a flow-control node at the input of a subgraph, with a loopback connection from the final output to the flow-control node. The flow-control node is thus able to keep track of how many timestamps are being processed in the downstream graph, and drop packets if this count hits a (configurable) limit; and since packets are dropped upstream, we avoid the wasted work that would result from partially processing a timestamp and then dropping packets between intermediate stages.
- (処理に時間がかかり、後続の処理が詰まってしまうことを防ぐために)処理を制御する方法は主に2つある
- 1つ目は、Calculatorの設定を変更し、バッファされるPacketの数の上限を変更する
- 2つ目は、いくつかのPacketを捨てる(drop)するCalculatorを、Graphの途中に追加する
まとめ
今回は、MediaPipeの処理制御について、公式Docを詳しく見てみました。前回、最初は手を4つ分検出しているものの、途中から2つ分しか検出処理が走らないようになっていたことから、タイムスタンプ・Input Policy・フロー制御あたりが関係していそうです。
次回は、今回の内容を踏まえ、原因をより詳しく調べ、問題を解決していきたいと思います。
次↓
参考にさせていただいたページ
Framework Architecture(MediaPipe Doc) https://mediapipe.readthedocs.io/en/latest/scheduling_sync.html